{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .appName(\"KMeansExample\")\\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "documentDF = spark.createDataFrame([\n",
    "    (\"Word2Vec is an Estimator which takes sequences of words representing\".split(\" \"), ),\n",
    "    (\"The Word2VecModel transforms each document into a vector using\".split(\" \"), ),\n",
    "    (\"his vector can then be used as features for prediction\".split(\" \"), )\n",
    "], [\"text\"])  #创建测试DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Word2Vec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol=\"text\", outputCol=\"result\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = word2Vec.fit(documentDF) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "result = model.transform(documentDF) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "文本: [his, vector, can, then, be, used, as, features, for, prediction] => \n",
      "特征向量: [0.0018979262560606003,-0.034886953281238677,-0.03560755136422813]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "for row in result.collect():                #查看转换的特征\n",
    "    text, vector = row\n",
    "print(u\"文本: [%s] => \\n特征向量: %s\\n\" % (\", \".join(text), str(vector)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import FeatureHasher\n",
    "dataset = spark.createDataFrame([\n",
    "    (2.2, True, \"1\", \"foo\"),\n",
    "    (3.3, False, \"2\", \"bar\"),\n",
    "    (4.4, False, \"3\", \"baz\"),\n",
    "    (5.5, False, \"4\", \"foo\")\n",
    "], [\"real\", \"bool\", \"stringNum\", \"string\"])\n",
    "hasher = FeatureHasher(inputCols=[\"real\", \"bool\", \"stringNum\", \"string\"],outputCol=\"features\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+---------+------+--------------------------------------------------------+\n",
      "|real|bool |stringNum|string|features                                                |\n",
      "+----+-----+---------+------+--------------------------------------------------------+\n",
      "|2.2 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|\n",
      "|3.3 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3])  |\n",
      "|4.4 |false|3        |baz   |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0])  |\n",
      "|5.5 |false|4        |foo   |(262144,[70644,101499,174475,257907],[1.0,1.0,5.5,1.0]) |\n",
      "+----+-----+---------+------+--------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "featurized = hasher.transform(dataset)\n",
    "featurized.show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "False"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml import Transformer\n",
    "from pyspark.ml import Model\n",
    "from pyspark.ml import Estimator\n",
    "isinstance(hasher,Transformer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------+-------------+\n",
      "| id|category|categoryIndex|\n",
      "+---+--------+-------------+\n",
      "|  0|       a|          0.0|\n",
      "|  1|       b|          2.0|\n",
      "|  2|       c|          1.0|\n",
      "|  3|       a|          0.0|\n",
      "|  4|       a|          0.0|\n",
      "|  5|       c|          1.0|\n",
      "+---+--------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StringIndexer\n",
    "\n",
    "df = spark.createDataFrame(\n",
    "    [(0, \"a\"), (1, \"b\"), (2, \"c\"), (3, \"a\"), (4, \"a\"), (5, \"c\")],\n",
    "    [\"id\", \"category\"])\n",
    "\n",
    "indexer = StringIndexer(inputCol=\"category\", outputCol=\"categoryIndex\")\n",
    "indexed = indexer.fit(df).transform(df)\n",
    "indexed.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- id: long (nullable = true)\n",
      " |-- category: string (nullable = true)\n",
      " |-- categoryIndex: double (nullable = false)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "indexed.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "isinstance(indexer,Estimator)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import IndexToString, StringIndexer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame(\n",
    "    [(0, \"a\"), (1, \"b\"), (2, \"c\"), (3, \"a\"), (4, \"a\"), (5, \"c\")],\n",
    "    [\"id\", \"category\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "indexer = StringIndexer(inputCol=\"category\", outputCol=\"categoryIndex\")\n",
    "model = indexer.fit(df)\n",
    "indexed = model.transform(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Transformed string column 'category' to indexed column 'categoryIndex'\n",
      "+---+--------+-------------+\n",
      "| id|category|categoryIndex|\n",
      "+---+--------+-------------+\n",
      "|  0|       a|          0.0|\n",
      "|  1|       b|          2.0|\n",
      "|  2|       c|          1.0|\n",
      "|  3|       a|          0.0|\n",
      "|  4|       a|          0.0|\n",
      "|  5|       c|          1.0|\n",
      "+---+--------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(\"Transformed string column '%s' to indexed column '%s'\"\n",
    "      % (indexer.getInputCol(), indexer.getOutputCol()))\n",
    "indexed.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "converter = IndexToString(inputCol=\"categoryIndex\", outputCol=\"originalCategory\")\n",
    "converted = converter.transform(indexed)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata\n",
      "+---+-------------+----------------+\n",
      "| id|categoryIndex|originalCategory|\n",
      "+---+-------------+----------------+\n",
      "|  0|          0.0|               a|\n",
      "|  1|          2.0|               b|\n",
      "|  2|          1.0|               c|\n",
      "|  3|          0.0|               a|\n",
      "|  4|          0.0|               a|\n",
      "|  5|          1.0|               c|\n",
      "+---+-------------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(\"Transformed indexed column '%s' back to original string column '%s' using \"\n",
    "      \"labels in metadata\" % (converter.getInputCol(), converter.getOutputCol()))\n",
    "converted.select(\"id\", \"categoryIndex\", \"originalCategory\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- id: long (nullable = true)\n",
      " |-- category: string (nullable = true)\n",
      " |-- categoryIndex: double (nullable = false)\n",
      " |-- originalCategory: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "converted.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 37,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "isinstance(converter,Transformer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import OneHotEncoderEstimator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame([\n",
    "    (0.0, 1.0),\n",
    "    (1.0, 0.0),\n",
    "    (2.0, 1.0),\n",
    "    (3.0, 2.0),\n",
    "    (2.0, 1.0),\n",
    "    (3.0, 0.0)\n",
    "], [\"category1\", \"category2\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [],
   "source": [
    "encoder = OneHotEncoderEstimator(inputCols=[\"category1\", \"category2\"],outputCols=[\"Vec1\", \"Vec2\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+-------------+-------------+\n",
      "|category1|category2|         Vec1|         Vec2|\n",
      "+---------+---------+-------------+-------------+\n",
      "|      0.0|      1.0|(3,[0],[1.0])|(2,[1],[1.0])|\n",
      "|      1.0|      0.0|(3,[1],[1.0])|(2,[0],[1.0])|\n",
      "|      2.0|      1.0|(3,[2],[1.0])|(2,[1],[1.0])|\n",
      "|      3.0|      2.0|    (3,[],[])|    (2,[],[])|\n",
      "|      2.0|      1.0|(3,[2],[1.0])|(2,[1],[1.0])|\n",
      "|      3.0|      0.0|    (3,[],[])|(2,[0],[1.0])|\n",
      "+---------+---------+-------------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "model = encoder.fit(df)\n",
    "encoded = model.transform(df)\n",
    "encoded.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import ChiSqSelector\n",
    "from pyspark.ml.linalg import Vectors\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 75,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame([\n",
    "    (1, Vectors.dense([2.0, 3.0, 10.0, 1.0]), 1.0,),\n",
    "    (2, Vectors.dense([0.0, 1.0, 7.0, 0.0]), 0.0,),\n",
    "    (3, Vectors.dense([1.0, 2.0, 3.0, 0.1]), 0.0,)], [\"id\", \"features\", \"resulted\"])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 76,
   "metadata": {},
   "outputs": [],
   "source": [
    "selector = ChiSqSelector(numTopFeatures=1, featuresCol=\"features\",\n",
    "                         outputCol=\"selectedFeatures\", labelCol=\"resulted\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "metadata": {},
   "outputs": [],
   "source": [
    "result = selector.fit(df).transform(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ChiSqSelector output with top 1 features selected\n",
      "+---+------------------+--------+----------------+\n",
      "| id|          features|resulted|selectedFeatures|\n",
      "+---+------------------+--------+----------------+\n",
      "|  1|[2.0,3.0,10.0,1.0]|     1.0|           [2.0]|\n",
      "|  2| [0.0,1.0,7.0,0.0]|     0.0|           [0.0]|\n",
      "|  3| [1.0,2.0,3.0,0.1]|     0.0|           [1.0]|\n",
      "+---+------------------+--------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "print(\"ChiSqSelector output with top %d features selected\" % selector.getNumTopFeatures())\n",
    "result.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import VectorSlicer\n",
    "from pyspark.ml.linalg import Vectors\n",
    "from pyspark.sql.types import Row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame([\n",
    "    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),\n",
    "    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 81,
   "metadata": {},
   "outputs": [],
   "source": [
    "slicer = VectorSlicer(inputCol=\"userFeatures\", outputCol=\"features\", indices=[1])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 82,
   "metadata": {},
   "outputs": [],
   "source": [
    "output = slicer.transform(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------------+\n",
      "|        userFeatures|     features|\n",
      "+--------------------+-------------+\n",
      "|(3,[0,1],[-2.0,2.3])|(1,[0],[2.3])|\n",
      "|      [-2.0,2.3,0.0]|        [2.3]|\n",
      "+--------------------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "output.select(\"userFeatures\", \"features\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame([(Vectors.dense([-2.0, 2.3, 0.0, 0.0, 1.0]),),\n",
    "                            (Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0]),),\n",
    "                            (Vectors.dense([0.6, -1.1, -3.0, 4.5, 3.3]),)], [\"features\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 85,
   "metadata": {},
   "outputs": [],
   "source": [
    "vs=VectorSlicer(inputCol=\"features\", outputCol=\"sliced\", indices=[1, 4])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 90,
   "metadata": {},
   "outputs": [],
   "source": [
    "data=vs.transform(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 95,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------+----------+\n",
      "|features               |sliced    |\n",
      "+-----------------------+----------+\n",
      "|[-2.0,2.3,0.0,0.0,1.0] |[2.3,1.0] |\n",
      "|[0.0,0.0,0.0,0.0,0.0]  |[0.0,0.0] |\n",
      "|[0.6,-1.1,-3.0,4.5,3.3]|[-1.1,3.3]|\n",
      "+-----------------------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data.show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 99,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import RFormula\n",
    "df = spark.createDataFrame([(1.0, 1.0, \"a\"),(0.0, 2.0, \"b\"),(0.0, 0.0, \"a\")], [\"y\", \"x\", \"s\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 100,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---+---+---------+-----+\n",
      "|  y|  x|  s| features|label|\n",
      "+---+---+---+---------+-----+\n",
      "|1.0|1.0|  a|[1.0,1.0]|  1.0|\n",
      "|0.0|2.0|  b|[2.0,0.0]|  0.0|\n",
      "|0.0|0.0|  a|[0.0,1.0]|  0.0|\n",
      "+---+---+---+---------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rf = RFormula(formula=\"y ~ x + s\")\n",
    "data=model = rf.fit(df).transform(df)\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
